/*
 * Copyright 2012 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package io.netty.channel;

import io.netty.buffer.ByteBufAllocator;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import io.netty.util.DefaultAttributeMap;
import io.netty.util.Recycler;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ResourceLeakHint;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.OrderedEventExecutor;
import io.netty.util.internal.PromiseNotificationUtil;
import io.netty.util.internal.ThrowableUtil;
import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.net.SocketAddress;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
        implements ChannelHandlerContext, ResourceLeakHint
{

    private static final InternalLogger logger = InternalLoggerFactory
            .getInstance(AbstractChannelHandlerContext.class);

    volatile AbstractChannelHandlerContext next;

    volatile AbstractChannelHandlerContext prev;

    private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER = AtomicIntegerFieldUpdater
            .newUpdater(AbstractChannelHandlerContext.class, "handlerState");

    /**
     * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} is about to be
     * called.
     */
    private static final int ADD_PENDING = 1;

    /**
     * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called.
     */
    private static final int ADD_COMPLETE = 2;

    /**
     * {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
     */
    private static final int REMOVE_COMPLETE = 3;

    /**
     * Neither {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} nor
     * {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
     */
    private static final int INIT = 0;

    private final boolean inbound;

    private final boolean outbound;

    private final DefaultChannelPipeline pipeline;

    private final String name;

    private final boolean ordered;

    // Will be set to null if no child executor should be used, otherwise it
    // will be set to the
    // child executor.
    final EventExecutor executor;

    private ChannelFuture succeededFuture;

    // Lazily instantiated tasks used to trigger events to a handler with
    // different executor.
    // There is no need to make this volatile as at worse it will just create a
    // few more instances then needed.
    private Runnable invokeChannelReadCompleteTask;

    private Runnable invokeReadTask;

    private Runnable invokeChannelWritableStateChangedTask;

    private Runnable invokeFlushTask;

    private volatile int handlerState = INIT;

    AbstractChannelHandlerContext(DefaultChannelPipeline pipeline,
            EventExecutor executor, String name, boolean inbound,
            boolean outbound)
    {
        this.name = ObjectUtil.checkNotNull(name, "name");
        this.pipeline = pipeline;
        this.executor = executor;
        this.inbound = inbound;
        this.outbound = outbound;
        // Its ordered if its driven by the EventLoop or the given Executor is
        // an instanceof OrderedEventExecutor.
        ordered = executor == null || executor instanceof OrderedEventExecutor;
    }

    @Override
    public Channel channel()
    {
        return pipeline.channel();
    }

    @Override
    public ChannelPipeline pipeline()
    {
        return pipeline;
    }

    @Override
    public ByteBufAllocator alloc()
    {
        return channel().config().getAllocator();
    }

    @Override
    public EventExecutor executor()
    {
        if (executor == null)
        {
            return channel().eventLoop();
        }
        else
        {
            return executor;
        }
    }

    @Override
    public String name()
    {
        return name;
    }

    @Override
    public ChannelHandlerContext fireChannelRegistered()
    {
        invokeChannelRegistered(findContextInbound());
        return this;
    }

    static void invokeChannelRegistered(
            final AbstractChannelHandlerContext next)
    {
        EventExecutor executor = next.executor();
        if (executor.inEventLoop())
        {
            next.invokeChannelRegistered();
        }
        else
        {
            executor.execute(new Runnable()
            {
                @Override
                public void run()
                {
                    next.invokeChannelRegistered();
                }
            });
        }
    }

    private void invokeChannelRegistered()
    {
        if (invokeHandler())
        {
            try
            {
                ((ChannelInboundHandler) handler()).channelRegistered(this);
            }
            catch (Throwable t)
            {
                notifyHandlerException(t);
            }
        }
        else
        {
            fireChannelRegistered();
        }
    }

    @Override
    public ChannelHandlerContext fireChannelUnregistered()
    {
        invokeChannelUnregistered(findContextInbound());
        return this;
    }

    static void invokeChannelUnregistered(
            final AbstractChannelHandlerContext next)
    {
        EventExecutor executor = next.executor();
        if (executor.inEventLoop())
        {
            next.invokeChannelUnregistered();
        }
        else
        {
            executor.execute(new Runnable()
            {
                @Override
                public void run()
                {
                    next.invokeChannelUnregistered();
                }
            });
        }
    }

    private void invokeChannelUnregistered()
    {
        if (invokeHandler())
        {
            try
            {
                ((ChannelInboundHandler) handler()).channelUnregistered(this);
            }
            catch (Throwable t)
            {
                notifyHandlerException(t);
            }
        }
        else
        {
            fireChannelUnregistered();
        }
    }

    @Override
    public ChannelHandlerContext fireChannelActive()
    {
        invokeChannelActive(findContextInbound());
        return this;
    }

    static void invokeChannelActive(final AbstractChannelHandlerContext next)
    {
        EventExecutor executor = next.executor();
        if (executor.inEventLoop())
        {
            next.invokeChannelActive();
        }
        else
        {
            executor.execute(new Runnable()
            {
                @Override
                public void run()
                {
                    next.invokeChannelActive();
                }
            });
        }
    }

    private void invokeChannelActive()
    {
        if (invokeHandler())
        {
            try
            {
                ((ChannelInboundHandler) handler()).channelActive(this);
            }
            catch (Throwable t)
            {
                notifyHandlerException(t);
            }
        }
        else
        {
            fireChannelActive();
        }
    }

    @Override
    public ChannelHandlerContext fireChannelInactive()
    {
        invokeChannelInactive(findContextInbound());
        return this;
    }

    static void invokeChannelInactive(final AbstractChannelHandlerContext next)
    {
        EventExecutor executor = next.executor();
        if (executor.inEventLoop())
        {
            next.invokeChannelInactive();
        }
        else
        {
            executor.execute(new Runnable()
            {
                @Override
                public void run()
                {
                    next.invokeChannelInactive();
                }
            });
        }
    }

    private void invokeChannelInactive()
    {
        if (invokeHandler())
        {
            try
            {
                ((ChannelInboundHandler) handler()).channelInactive(this);
            }
            catch (Throwable t)
            {
                notifyHandlerException(t);
            }
        }
        else
        {
            fireChannelInactive();
        }
    }

    @Override
    public ChannelHandlerContext fireExceptionCaught(final Throwable cause)
    {
        invokeExceptionCaught(next, cause);
        return this;
    }

    static void invokeExceptionCaught(final AbstractChannelHandlerContext next,
            final Throwable cause)
    {
        ObjectUtil.checkNotNull(cause, "cause");
        EventExecutor executor = next.executor();
        if (executor.inEventLoop())
        {
            next.invokeExceptionCaught(cause);
        }
        else
        {
            try
            {
                executor.execute(new Runnable()
                {
                    @Override
                    public void run()
                    {
                        next.invokeExceptionCaught(cause);
                    }
                });
            }
            catch (Throwable t)
            {
                if (logger.isWarnEnabled())
                {
                    logger.warn("Failed to submit an exceptionCaught() event.",
                            t);
                    logger.warn(
                            "The exceptionCaught() event that was failed to submit was:",
                            cause);
                }
            }
        }
    }

    private void invokeExceptionCaught(final Throwable cause)
    {
        if (invokeHandler())
        {
            try
            {
                handler().exceptionCaught(this, cause);
            }
            catch (Throwable error)
            {
                if (logger.isDebugEnabled())
                {
                    logger.debug(
                            "An exception {}"
                                    + "was thrown by a user handler's exceptionCaught() "
                                    + "method while handling the following exception:",
                            ThrowableUtil.stackTraceToString(error), cause);
                }
                else if (logger.isWarnEnabled())
                {
                    logger.warn(
                            "An exception '{}' [enable DEBUG level for full stacktrace] "
                                    + "was thrown by a user handler's exceptionCaught() "
                                    + "method while handling the following exception:",
                            error, cause);
                }
            }
        }
        else
        {
            fireExceptionCaught(cause);
        }
    }

    @Override
    public ChannelHandlerContext fireUserEventTriggered(final Object event)
    {
        invokeUserEventTriggered(findContextInbound(), event);
        return this;
    }

    static void invokeUserEventTriggered(
            final AbstractChannelHandlerContext next, final Object event)
    {
        ObjectUtil.checkNotNull(event, "event");
        EventExecutor executor = next.executor();
        if (executor.inEventLoop())
        {
            next.invokeUserEventTriggered(event);
        }
        else
        {
            executor.execute(new Runnable()
            {
                @Override
                public void run()
                {
                    next.invokeUserEventTriggered(event);
                }
            });
        }
    }

    private void invokeUserEventTriggered(Object event)
    {
        if (invokeHandler())
        {
            try
            {
                ((ChannelInboundHandler) handler()).userEventTriggered(this,
                        event);
            }
            catch (Throwable t)
            {
                notifyHandlerException(t);
            }
        }
        else
        {
            fireUserEventTriggered(event);
        }
    }

    @Override
    public ChannelHandlerContext fireChannelRead(final Object msg)
    {
        invokeChannelRead(findContextInbound(), msg);
        return this;
    }

    static void invokeChannelRead(final AbstractChannelHandlerContext next,
            Object msg)
    {
        final Object m = next.pipeline
                .touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop())
        {
            next.invokeChannelRead(m);
        }
        else
        {
            executor.execute(new Runnable()
            {
                @Override
                public void run()
                {
                    next.invokeChannelRead(m);
                }
            });
        }
    }

    private void invokeChannelRead(Object msg)
    {
        if (invokeHandler())
        {
            try
            {
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
            }
            catch (Throwable t)
            {
                notifyHandlerException(t);
            }
        }
        else
        {
            fireChannelRead(msg);
        }
    }

    @Override
    public ChannelHandlerContext fireChannelReadComplete()
    {
        invokeChannelReadComplete(findContextInbound());
        return this;
    }

    static void invokeChannelReadComplete(
            final AbstractChannelHandlerContext next)
    {
        EventExecutor executor = next.executor();
        if (executor.inEventLoop())
        {
            next.invokeChannelReadComplete();
        }
        else
        {
            Runnable task = next.invokeChannelReadCompleteTask;
            if (task == null)
            {
                next.invokeChannelReadCompleteTask = task = new Runnable()
                {
                    @Override
                    public void run()
                    {
                        next.invokeChannelReadComplete();
                    }
                };
            }
            executor.execute(task);
        }
    }

    private void invokeChannelReadComplete()
    {
        if (invokeHandler())
        {
            try
            {
                ((ChannelInboundHandler) handler()).channelReadComplete(this);
            }
            catch (Throwable t)
            {
                notifyHandlerException(t);
            }
        }
        else
        {
            fireChannelReadComplete();
        }
    }

    @Override
    public ChannelHandlerContext fireChannelWritabilityChanged()
    {
        invokeChannelWritabilityChanged(findContextInbound());
        return this;
    }

    static void invokeChannelWritabilityChanged(
            final AbstractChannelHandlerContext next)
    {
        EventExecutor executor = next.executor();
        if (executor.inEventLoop())
        {
            next.invokeChannelWritabilityChanged();
        }
        else
        {
            Runnable task = next.invokeChannelWritableStateChangedTask;
            if (task == null)
            {
                next.invokeChannelWritableStateChangedTask = task = new Runnable()
                {
                    @Override
                    public void run()
                    {
                        next.invokeChannelWritabilityChanged();
                    }
                };
            }
            executor.execute(task);
        }
    }

    private void invokeChannelWritabilityChanged()
    {
        if (invokeHandler())
        {
            try
            {
                ((ChannelInboundHandler) handler())
                        .channelWritabilityChanged(this);
            }
            catch (Throwable t)
            {
                notifyHandlerException(t);
            }
        }
        else
        {
            fireChannelWritabilityChanged();
        }
    }

    @Override
    public ChannelFuture bind(SocketAddress localAddress)
    {
        return bind(localAddress, newPromise());
    }

    @Override
    public ChannelFuture connect(SocketAddress remoteAddress)
    {
        return connect(remoteAddress, newPromise());
    }

    @Override
    public ChannelFuture connect(SocketAddress remoteAddress,
            SocketAddress localAddress)
    {
        return connect(remoteAddress, localAddress, newPromise());
    }

    @Override
    public ChannelFuture disconnect()
    {
        return disconnect(newPromise());
    }

    @Override
    public ChannelFuture close()
    {
        return close(newPromise());
    }

    @Override
    public ChannelFuture deregister()
    {
        return deregister(newPromise());
    }

    @Override
    public ChannelFuture bind(final SocketAddress localAddress,
            final ChannelPromise promise)
    {
        if (localAddress == null)
        {
            throw new NullPointerException("localAddress");
        }
        if (isNotValidPromise(promise, false))
        {
            // cancelled
            return promise;
        }

        final AbstractChannelHandlerContext next = findContextOutbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop())
        {
            next.invokeBind(localAddress, promise);
        }
        else
        {
            safeExecute(executor, new Runnable()
            {
                @Override
                public void run()
                {
                    next.invokeBind(localAddress, promise);
                }
            }, promise, null);
        }
        return promise;
    }

    private void invokeBind(SocketAddress localAddress, ChannelPromise promise)
    {
        if (invokeHandler())
        {
            try
            {
                ((ChannelOutboundHandler) handler()).bind(this, localAddress,
                        promise);
            }
            catch (Throwable t)
            {
                notifyOutboundHandlerException(t, promise);
            }
        }
        else
        {
            bind(localAddress, promise);
        }
    }

    @Override
    public ChannelFuture connect(SocketAddress remoteAddress,
            ChannelPromise promise)
    {
        return connect(remoteAddress, null, promise);
    }

    @Override
    public ChannelFuture connect(final SocketAddress remoteAddress,
            final SocketAddress localAddress, final ChannelPromise promise)
    {

        if (remoteAddress == null)
        {
            throw new NullPointerException("remoteAddress");
        }
        if (isNotValidPromise(promise, false))
        {
            // cancelled
            return promise;
        }

        final AbstractChannelHandlerContext next = findContextOutbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop())
        {
            next.invokeConnect(remoteAddress, localAddress, promise);
        }
        else
        {
            safeExecute(executor, new Runnable()
            {
                @Override
                public void run()
                {
                    next.invokeConnect(remoteAddress, localAddress, promise);
                }
            }, promise, null);
        }
        return promise;
    }

    private void invokeConnect(SocketAddress remoteAddress,
            SocketAddress localAddress, ChannelPromise promise)
    {
        if (invokeHandler())
        {
            try
            {
                ((ChannelOutboundHandler) handler()).connect(this,
                        remoteAddress, localAddress, promise);
            }
            catch (Throwable t)
            {
                notifyOutboundHandlerException(t, promise);
            }
        }
        else
        {
            connect(remoteAddress, localAddress, promise);
        }
    }

    @Override
    public ChannelFuture disconnect(final ChannelPromise promise)
    {
        if (isNotValidPromise(promise, false))
        {
            // cancelled
            return promise;
        }

        final AbstractChannelHandlerContext next = findContextOutbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop())
        {
            // Translate disconnect to close if the channel has no notion of
            // disconnect-reconnect.
            // So far, UDP/IP is the only transport that has such behavior.
            if (!channel().metadata().hasDisconnect())
            {
                next.invokeClose(promise);
            }
            else
            {
                next.invokeDisconnect(promise);
            }
        }
        else
        {
            safeExecute(executor, new Runnable()
            {
                @Override
                public void run()
                {
                    if (!channel().metadata().hasDisconnect())
                    {
                        next.invokeClose(promise);
                    }
                    else
                    {
                        next.invokeDisconnect(promise);
                    }
                }
            }, promise, null);
        }
        return promise;
    }

    private void invokeDisconnect(ChannelPromise promise)
    {
        if (invokeHandler())
        {
            try
            {
                ((ChannelOutboundHandler) handler()).disconnect(this, promise);
            }
            catch (Throwable t)
            {
                notifyOutboundHandlerException(t, promise);
            }
        }
        else
        {
            disconnect(promise);
        }
    }

    @Override
    public ChannelFuture close(final ChannelPromise promise)
    {
        if (isNotValidPromise(promise, false))
        {
            // cancelled
            return promise;
        }

        final AbstractChannelHandlerContext next = findContextOutbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop())
        {
            next.invokeClose(promise);
        }
        else
        {
            safeExecute(executor, new Runnable()
            {
                @Override
                public void run()
                {
                    next.invokeClose(promise);
                }
            }, promise, null);
        }

        return promise;
    }

    private void invokeClose(ChannelPromise promise)
    {
        if (invokeHandler())
        {
            try
            {
                ((ChannelOutboundHandler) handler()).close(this, promise);
            }
            catch (Throwable t)
            {
                notifyOutboundHandlerException(t, promise);
            }
        }
        else
        {
            close(promise);
        }
    }

    @Override
    public ChannelFuture deregister(final ChannelPromise promise)
    {
        if (isNotValidPromise(promise, false))
        {
            // cancelled
            return promise;
        }

        final AbstractChannelHandlerContext next = findContextOutbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop())
        {
            next.invokeDeregister(promise);
        }
        else
        {
            safeExecute(executor, new Runnable()
            {
                @Override
                public void run()
                {
                    next.invokeDeregister(promise);
                }
            }, promise, null);
        }

        return promise;
    }

    private void invokeDeregister(ChannelPromise promise)
    {
        if (invokeHandler())
        {
            try
            {
                ((ChannelOutboundHandler) handler()).deregister(this, promise);
            }
            catch (Throwable t)
            {
                notifyOutboundHandlerException(t, promise);
            }
        }
        else
        {
            deregister(promise);
        }
    }

    @Override
    public ChannelHandlerContext read()
    {
        final AbstractChannelHandlerContext next = findContextOutbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop())
        {
            next.invokeRead();
        }
        else
        {
            Runnable task = next.invokeReadTask;
            if (task == null)
            {
                next.invokeReadTask = task = new Runnable()
                {
                    @Override
                    public void run()
                    {
                        next.invokeRead();
                    }
                };
            }
            executor.execute(task);
        }

        return this;
    }

    private void invokeRead()
    {
        if (invokeHandler())
        {
            try
            {
                ((ChannelOutboundHandler) handler()).read(this);
            }
            catch (Throwable t)
            {
                notifyHandlerException(t);
            }
        }
        else
        {
            read();
        }
    }

    @Override
    public ChannelFuture write(Object msg)
    {
        return write(msg, newPromise());
    }

    @Override
    public ChannelFuture write(final Object msg, final ChannelPromise promise)
    {
        if (msg == null)
        {
            throw new NullPointerException("msg");
        }

        try
        {
            if (isNotValidPromise(promise, true))
            {
                ReferenceCountUtil.release(msg);
                // cancelled
                return promise;
            }
        }
        catch (RuntimeException e)
        {
            ReferenceCountUtil.release(msg);
            throw e;
        }
        write(msg, false, promise);

        return promise;
    }

    private void invokeWrite(Object msg, ChannelPromise promise)
    {
        if (invokeHandler())
        {
            invokeWrite0(msg, promise);
        }
        else
        {
            write(msg, promise);
        }
    }

    private void invokeWrite0(Object msg, ChannelPromise promise)
    {
        try
        {
            ((ChannelOutboundHandler) handler()).write(this, msg, promise);
        }
        catch (Throwable t)
        {
            notifyOutboundHandlerException(t, promise);
        }
    }

    @Override
    public ChannelHandlerContext flush()
    {
        final AbstractChannelHandlerContext next = findContextOutbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop())
        {
            next.invokeFlush();
        }
        else
        {
            Runnable task = next.invokeFlushTask;
            if (task == null)
            {
                next.invokeFlushTask = task = new Runnable()
                {
                    @Override
                    public void run()
                    {
                        next.invokeFlush();
                    }
                };
            }
            safeExecute(executor, task, channel().voidPromise(), null);
        }

        return this;
    }

    private void invokeFlush()
    {
        if (invokeHandler())
        {
            invokeFlush0();
        }
        else
        {
            flush();
        }
    }

    private void invokeFlush0()
    {
        try
        {
            ((ChannelOutboundHandler) handler()).flush(this);
        }
        catch (Throwable t)
        {
            notifyHandlerException(t);
        }
    }

    @Override
    public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise)
    {
        if (msg == null)
        {
            throw new NullPointerException("msg");
        }

        if (isNotValidPromise(promise, true))
        {
            ReferenceCountUtil.release(msg);
            // cancelled
            return promise;
        }

        write(msg, true, promise);

        return promise;
    }

    private void invokeWriteAndFlush(Object msg, ChannelPromise promise)
    {
        if (invokeHandler())
        {
            invokeWrite0(msg, promise);
            invokeFlush0();
        }
        else
        {
            writeAndFlush(msg, promise);
        }
    }

    private void write(Object msg, boolean flush, ChannelPromise promise)
    {
        AbstractChannelHandlerContext next = findContextOutbound();
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop())
        {
            if (flush)
            {
                next.invokeWriteAndFlush(m, promise);
            }
            else
            {
                next.invokeWrite(m, promise);
            }
        }
        else
        {
            AbstractWriteTask task;
            if (flush)
            {
                task = WriteAndFlushTask.newInstance(next, m, promise);
            }
            else
            {
                task = WriteTask.newInstance(next, m, promise);
            }
            safeExecute(executor, task, promise, m);
        }
    }

    @Override
    public ChannelFuture writeAndFlush(Object msg)
    {
        return writeAndFlush(msg, newPromise());
    }

    private static void notifyOutboundHandlerException(Throwable cause,
            ChannelPromise promise)
    {
        if (!(promise instanceof VoidChannelPromise))
        {
            PromiseNotificationUtil.tryFailure(promise, cause, logger);
        }
    }

    private void notifyHandlerException(Throwable cause)
    {
        if (inExceptionCaught(cause))
        {
            if (logger.isWarnEnabled())
            {
                logger.warn(
                        "An exception was thrown by a user handler "
                                + "while handling an exceptionCaught event",
                        cause);
            }
            return;
        }

        invokeExceptionCaught(cause);
    }

    private static boolean inExceptionCaught(Throwable cause)
    {
        do
        {
            StackTraceElement[] trace = cause.getStackTrace();
            if (trace != null)
            {
                for (StackTraceElement t : trace)
                {
                    if (t == null)
                    {
                        break;
                    }
                    if ("exceptionCaught".equals(t.getMethodName()))
                    {
                        return true;
                    }
                }
            }

            cause = cause.getCause();
        }
        while (cause != null);

        return false;
    }

    @Override
    public ChannelPromise newPromise()
    {
        return new DefaultChannelPromise(channel(), executor());
    }

    @Override
    public ChannelProgressivePromise newProgressivePromise()
    {
        return new DefaultChannelProgressivePromise(channel(), executor());
    }

    @Override
    public ChannelFuture newSucceededFuture()
    {
        ChannelFuture succeededFuture = this.succeededFuture;
        if (succeededFuture == null)
        {
            this.succeededFuture = succeededFuture = new SucceededChannelFuture(
                    channel(), executor());
        }
        return succeededFuture;
    }

    @Override
    public ChannelFuture newFailedFuture(Throwable cause)
    {
        return new FailedChannelFuture(channel(), executor(), cause);
    }

    private boolean isNotValidPromise(ChannelPromise promise,
            boolean allowVoidPromise)
    {
        if (promise == null)
        {
            throw new NullPointerException("promise");
        }

        if (promise.isDone())
        {
            // Check if the promise was cancelled and if so signal that the
            // processing of the operation
            // should not be performed.
            //
            // See https://github.com/netty/netty/issues/2349
            if (promise.isCancelled())
            {
                return true;
            }
            throw new IllegalArgumentException(
                    "promise already done: " + promise);
        }

        if (promise.channel() != channel())
        {
            throw new IllegalArgumentException(String.format(
                    "promise.channel does not match: %s (expected: %s)",
                    promise.channel(), channel()));
        }

        if (promise.getClass() == DefaultChannelPromise.class)
        {
            return false;
        }

        if (!allowVoidPromise && promise instanceof VoidChannelPromise)
        {
            throw new IllegalArgumentException(
                    StringUtil.simpleClassName(VoidChannelPromise.class)
                            + " not allowed for this operation");
        }

        if (promise instanceof AbstractChannel.CloseFuture)
        {
            throw new IllegalArgumentException(StringUtil
                    .simpleClassName(AbstractChannel.CloseFuture.class)
                    + " not allowed in a pipeline");
        }
        return false;
    }

    private AbstractChannelHandlerContext findContextInbound()
    {
        AbstractChannelHandlerContext ctx = this;
        do
        {
            ctx = ctx.next;
        }
        while (!ctx.inbound);
        return ctx;
    }

    private AbstractChannelHandlerContext findContextOutbound()
    {
        AbstractChannelHandlerContext ctx = this;
        do
        {
            ctx = ctx.prev;
        }
        while (!ctx.outbound);
        return ctx;
    }

    @Override
    public ChannelPromise voidPromise()
    {
        return channel().voidPromise();
    }

    final void setRemoved()
    {
        handlerState = REMOVE_COMPLETE;
    }

    final void setAddComplete()
    {
        for (;;)
        {
            int oldState = handlerState;
            // Ensure we never update when the handlerState is REMOVE_COMPLETE
            // already.
            // oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE
            // when an EventExecutor is used that is not
            // exposing ordering guarantees.
            if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER
                    .compareAndSet(this, oldState, ADD_COMPLETE))
            {
                return;
            }
        }
    }

    final void setAddPending()
    {
        boolean updated = HANDLER_STATE_UPDATER.compareAndSet(this, INIT,
                ADD_PENDING);
        assert updated; // This should always be true as it MUST be called
                        // before setAddComplete() or setRemoved().
    }

    /**
     * Makes best possible effort to detect if
     * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called
     * yet. If not return {@code false} and if called or could not detect return
     * {@code true}.
     *
     * If this method returns {@code false} we will not invoke the
     * {@link ChannelHandler} but just forward the event. This is needed as
     * {@link DefaultChannelPipeline} may already put the {@link ChannelHandler}
     * in the linked-list but not called
     * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}.
     */
    private boolean invokeHandler()
    {
        // Store in local variable to reduce volatile reads.
        int handlerState = this.handlerState;
        return handlerState == ADD_COMPLETE
                || (!ordered && handlerState == ADD_PENDING);
    }

    @Override
    public boolean isRemoved()
    {
        return handlerState == REMOVE_COMPLETE;
    }

    @Override
    public <T> Attribute<T> attr(AttributeKey<T> key)
    {
        return channel().attr(key);
    }

    @Override
    public <T> boolean hasAttr(AttributeKey<T> key)
    {
        return channel().hasAttr(key);
    }

    private static void safeExecute(EventExecutor executor, Runnable runnable,
            ChannelPromise promise, Object msg)
    {
        try
        {
            executor.execute(runnable);
        }
        catch (Throwable cause)
        {
            try
            {
                promise.setFailure(cause);
            }
            finally
            {
                if (msg != null)
                {
                    ReferenceCountUtil.release(msg);
                }
            }
        }
    }

    @Override
    public String toHintString()
    {
        return '\'' + name + "' will handle the message from this point.";
    }

    @Override
    public String toString()
    {
        return StringUtil.simpleClassName(ChannelHandlerContext.class) + '('
                + name + ", " + channel() + ')';
    }

    abstract static class AbstractWriteTask implements Runnable
    {

        private static final boolean ESTIMATE_TASK_SIZE_ON_SUBMIT = SystemPropertyUtil
                .getBoolean("io.netty.transport.estimateSizeOnSubmit", true);

        // Assuming a 64-bit JVM, 16 bytes object header, 3 reference fields and
        // one int field, plus alignment
        private static final int WRITE_TASK_OVERHEAD = SystemPropertyUtil
                .getInt("io.netty.transport.writeTaskSizeOverhead", 48);

        private final Recycler.Handle<AbstractWriteTask> handle;

        private AbstractChannelHandlerContext ctx;

        private Object msg;

        private ChannelPromise promise;

        private int size;

        @SuppressWarnings("unchecked")
        private AbstractWriteTask(
                Recycler.Handle<? extends AbstractWriteTask> handle)
        {
            this.handle = (Recycler.Handle<AbstractWriteTask>) handle;
        }

        protected static void init(AbstractWriteTask task,
                AbstractChannelHandlerContext ctx, Object msg,
                ChannelPromise promise)
        {
            task.ctx = ctx;
            task.msg = msg;
            task.promise = promise;

            if (ESTIMATE_TASK_SIZE_ON_SUBMIT)
            {
                ChannelOutboundBuffer buffer = ctx.channel().unsafe()
                        .outboundBuffer();

                // Check for null as it may be set to null if the channel is
                // closed already
                if (buffer != null)
                {
                    task.size = ctx.pipeline.estimatorHandle().size(msg)
                            + WRITE_TASK_OVERHEAD;
                    buffer.incrementPendingOutboundBytes(task.size);
                }
                else
                {
                    task.size = 0;
                }
            }
            else
            {
                task.size = 0;
            }
        }

        @Override
        public final void run()
        {
            try
            {
                ChannelOutboundBuffer buffer = ctx.channel().unsafe()
                        .outboundBuffer();
                // Check for null as it may be set to null if the channel is
                // closed already
                if (ESTIMATE_TASK_SIZE_ON_SUBMIT && buffer != null)
                {
                    buffer.decrementPendingOutboundBytes(size);
                }
                write(ctx, msg, promise);
            }
            finally
            {
                // Set to null so the GC can collect them directly
                ctx = null;
                msg = null;
                promise = null;
                handle.recycle(this);
            }
        }

        protected void write(AbstractChannelHandlerContext ctx, Object msg,
                ChannelPromise promise)
        {
            ctx.invokeWrite(msg, promise);
        }
    }

    static final class WriteTask extends AbstractWriteTask
            implements SingleThreadEventLoop.NonWakeupRunnable
    {

        private static final Recycler<WriteTask> RECYCLER = new Recycler<WriteTask>()
        {
            @Override
            protected WriteTask newObject(Handle<WriteTask> handle)
            {
                return new WriteTask(handle);
            }
        };

        private static WriteTask newInstance(AbstractChannelHandlerContext ctx,
                Object msg, ChannelPromise promise)
        {
            WriteTask task = RECYCLER.get();
            init(task, ctx, msg, promise);
            return task;
        }

        private WriteTask(Recycler.Handle<WriteTask> handle)
        {
            super(handle);
        }
    }

    static final class WriteAndFlushTask extends AbstractWriteTask
    {

        private static final Recycler<WriteAndFlushTask> RECYCLER = new Recycler<WriteAndFlushTask>()
        {
            @Override
            protected WriteAndFlushTask newObject(
                    Handle<WriteAndFlushTask> handle)
            {
                return new WriteAndFlushTask(handle);
            }
        };

        private static WriteAndFlushTask newInstance(
                AbstractChannelHandlerContext ctx, Object msg,
                ChannelPromise promise)
        {
            WriteAndFlushTask task = RECYCLER.get();
            init(task, ctx, msg, promise);
            return task;
        }

        private WriteAndFlushTask(Recycler.Handle<WriteAndFlushTask> handle)
        {
            super(handle);
        }

        @Override
        public void write(AbstractChannelHandlerContext ctx, Object msg,
                ChannelPromise promise)
        {
            super.write(ctx, msg, promise);
            ctx.invokeFlush();
        }
    }
}
